1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.InterruptedIOException;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.Reader;
28 import java.net.BindException;
29 import java.net.InetSocketAddress;
30 import java.net.Socket;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Random;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.zookeeper.server.NIOServerCnxnFactory;
42 import org.apache.zookeeper.server.ZooKeeperServer;
43 import org.apache.zookeeper.server.persistence.FileTxnLog;
44
45 import com.google.common.annotations.VisibleForTesting;
46
47
48
49
50
51
52 @InterfaceAudience.Public
53 @InterfaceStability.Evolving
54 public class MiniZooKeeperCluster {
55 private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
56
57 private static final int TICK_TIME = 2000;
58 private static final int CONNECTION_TIMEOUT = 30000;
59
60 private boolean started;
61
62
63 private int defaultClientPort = 0;
64
65 private List<NIOServerCnxnFactory> standaloneServerFactoryList;
66 private List<ZooKeeperServer> zooKeeperServers;
67 private List<Integer> clientPortList;
68
69 private int activeZKServerIndex;
70 private int tickTime = 0;
71
72 private Configuration configuration;
73
74 public MiniZooKeeperCluster() {
75 this(new Configuration());
76 }
77
78 public MiniZooKeeperCluster(Configuration configuration) {
79 this.started = false;
80 this.configuration = configuration;
81 activeZKServerIndex = -1;
82 zooKeeperServers = new ArrayList<ZooKeeperServer>();
83 clientPortList = new ArrayList<Integer>();
84 standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
85 }
86
87
88
89
90
91
92 public void addClientPort(int clientPort) {
93 clientPortList.add(clientPort);
94 }
95
96
97
98
99
100 @VisibleForTesting
101 public List<Integer> getClientPortList() {
102 return clientPortList;
103 }
104
105
106
107
108
109
110 private boolean hasValidClientPortInList(int index) {
111 return (clientPortList.size() > index && clientPortList.get(index) > 0);
112 }
113
114 public void setDefaultClientPort(int clientPort) {
115 if (clientPort <= 0) {
116 throw new IllegalArgumentException("Invalid default ZK client port: "
117 + clientPort);
118 }
119 this.defaultClientPort = clientPort;
120 }
121
122
123
124
125
126
127
128 private int selectClientPort(int seedPort) {
129 int i;
130 int returnClientPort = seedPort + 1;
131 if (returnClientPort == 0) {
132
133
134
135
136 if (defaultClientPort > 0) {
137 returnClientPort = defaultClientPort;
138 } else {
139 returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
140 }
141 }
142
143 while (true) {
144 for (i = 0; i < clientPortList.size(); i++) {
145 if (returnClientPort == clientPortList.get(i)) {
146
147 returnClientPort++;
148 break;
149 }
150 }
151 if (i == clientPortList.size()) {
152 break;
153 }
154 }
155 return returnClientPort;
156 }
157
158 public void setTickTime(int tickTime) {
159 this.tickTime = tickTime;
160 }
161
162 public int getBackupZooKeeperServerNum() {
163 return zooKeeperServers.size()-1;
164 }
165
166 public int getZooKeeperServerNum() {
167 return zooKeeperServers.size();
168 }
169
170
171 private static void setupTestEnv() {
172
173
174
175
176 System.setProperty("zookeeper.preAllocSize", "100");
177 FileTxnLog.setPreallocSize(100 * 1024);
178 }
179
180 public int startup(File baseDir) throws IOException, InterruptedException {
181 int numZooKeeperServers = clientPortList.size();
182 if (numZooKeeperServers == 0) {
183 numZooKeeperServers = 1;
184 }
185 return startup(baseDir, numZooKeeperServers);
186 }
187
188
189
190
191
192
193
194
195
196 public int startup(File baseDir, int numZooKeeperServers) throws IOException,
197 InterruptedException {
198 if (numZooKeeperServers <= 0)
199 return -1;
200
201 setupTestEnv();
202 shutdown();
203
204 int tentativePort = -1;
205 int currentClientPort;
206
207
208 for (int i = 0; i < numZooKeeperServers; i++) {
209 File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
210 createDir(dir);
211 int tickTimeToUse;
212 if (this.tickTime > 0) {
213 tickTimeToUse = this.tickTime;
214 } else {
215 tickTimeToUse = TICK_TIME;
216 }
217
218
219 if (hasValidClientPortInList(i)) {
220 currentClientPort = clientPortList.get(i);
221 } else {
222 tentativePort = selectClientPort(tentativePort);
223 currentClientPort = tentativePort;
224 }
225
226 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
227 NIOServerCnxnFactory standaloneServerFactory;
228 while (true) {
229 try {
230 standaloneServerFactory = new NIOServerCnxnFactory();
231 standaloneServerFactory.configure(
232 new InetSocketAddress(currentClientPort),
233 configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000));
234 } catch (BindException e) {
235 LOG.debug("Failed binding ZK Server to client port: " +
236 currentClientPort, e);
237
238 if (hasValidClientPortInList(i)) {
239 return -1;
240 }
241
242 tentativePort = selectClientPort(tentativePort);
243 currentClientPort = tentativePort;
244 continue;
245 }
246 break;
247 }
248
249
250 standaloneServerFactory.startup(server);
251
252 if (!waitForServerUp(currentClientPort, CONNECTION_TIMEOUT)) {
253 throw new IOException("Waiting for startup of standalone server");
254 }
255
256
257 if (clientPortList.size() <= i) {
258 clientPortList.add(currentClientPort);
259 }
260 else if (clientPortList.get(i) <= 0) {
261 clientPortList.remove(i);
262 clientPortList.add(i, currentClientPort);
263 }
264
265 standaloneServerFactoryList.add(standaloneServerFactory);
266 zooKeeperServers.add(server);
267 }
268
269
270 activeZKServerIndex = 0;
271 started = true;
272 int clientPort = clientPortList.get(activeZKServerIndex);
273 LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
274 "on client port=" + clientPort);
275 return clientPort;
276 }
277
278 private void createDir(File dir) throws IOException {
279 try {
280 if (!dir.exists()) {
281 dir.mkdirs();
282 }
283 } catch (SecurityException e) {
284 throw new IOException("creating dir: " + dir, e);
285 }
286 }
287
288
289
290
291 public void shutdown() throws IOException {
292 if (!started) {
293 return;
294 }
295
296
297 for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
298 NIOServerCnxnFactory standaloneServerFactory =
299 standaloneServerFactoryList.get(i);
300 int clientPort = clientPortList.get(i);
301
302 standaloneServerFactory.shutdown();
303 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
304 throw new IOException("Waiting for shutdown of standalone server");
305 }
306 }
307 for (ZooKeeperServer zkServer: zooKeeperServers) {
308
309 zkServer.getZKDatabase().close();
310 }
311
312
313 started = false;
314 activeZKServerIndex = 0;
315 standaloneServerFactoryList.clear();
316 clientPortList.clear();
317 zooKeeperServers.clear();
318
319 LOG.info("Shutdown MiniZK cluster with all ZK servers");
320 }
321
322
323
324
325
326
327 public int killCurrentActiveZooKeeperServer() throws IOException,
328 InterruptedException {
329 if (!started || activeZKServerIndex < 0) {
330 return -1;
331 }
332
333
334 NIOServerCnxnFactory standaloneServerFactory =
335 standaloneServerFactoryList.get(activeZKServerIndex);
336 int clientPort = clientPortList.get(activeZKServerIndex);
337
338 standaloneServerFactory.shutdown();
339 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
340 throw new IOException("Waiting for shutdown of standalone server");
341 }
342
343 zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
344
345
346 standaloneServerFactoryList.remove(activeZKServerIndex);
347 clientPortList.remove(activeZKServerIndex);
348 zooKeeperServers.remove(activeZKServerIndex);
349 LOG.info("Kill the current active ZK servers in the cluster " +
350 "on client port: " + clientPort);
351
352 if (standaloneServerFactoryList.size() == 0) {
353
354 return -1;
355 }
356 clientPort = clientPortList.get(activeZKServerIndex);
357 LOG.info("Activate a backup zk server in the cluster " +
358 "on client port: " + clientPort);
359
360 return clientPort;
361 }
362
363
364
365
366
367
368 public void killOneBackupZooKeeperServer() throws IOException,
369 InterruptedException {
370 if (!started || activeZKServerIndex < 0 ||
371 standaloneServerFactoryList.size() <= 1) {
372 return ;
373 }
374
375 int backupZKServerIndex = activeZKServerIndex+1;
376
377 NIOServerCnxnFactory standaloneServerFactory =
378 standaloneServerFactoryList.get(backupZKServerIndex);
379 int clientPort = clientPortList.get(backupZKServerIndex);
380
381 standaloneServerFactory.shutdown();
382 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
383 throw new IOException("Waiting for shutdown of standalone server");
384 }
385
386 zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
387
388
389 standaloneServerFactoryList.remove(backupZKServerIndex);
390 clientPortList.remove(backupZKServerIndex);
391 zooKeeperServers.remove(backupZKServerIndex);
392 LOG.info("Kill one backup ZK servers in the cluster " +
393 "on client port: " + clientPort);
394 }
395
396
397 private static boolean waitForServerDown(int port, long timeout) throws IOException {
398 long start = System.currentTimeMillis();
399 while (true) {
400 try {
401 Socket sock = new Socket("localhost", port);
402 try {
403 OutputStream outstream = sock.getOutputStream();
404 outstream.write("stat".getBytes());
405 outstream.flush();
406 } finally {
407 sock.close();
408 }
409 } catch (IOException e) {
410 return true;
411 }
412
413 if (System.currentTimeMillis() > start + timeout) {
414 break;
415 }
416 try {
417 Thread.sleep(250);
418 } catch (InterruptedException e) {
419 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
420 }
421 }
422 return false;
423 }
424
425
426 private static boolean waitForServerUp(int port, long timeout) throws IOException {
427 long start = System.currentTimeMillis();
428 while (true) {
429 try {
430 Socket sock = new Socket("localhost", port);
431 BufferedReader reader = null;
432 try {
433 OutputStream outstream = sock.getOutputStream();
434 outstream.write("stat".getBytes());
435 outstream.flush();
436
437 Reader isr = new InputStreamReader(sock.getInputStream());
438 reader = new BufferedReader(isr);
439 String line = reader.readLine();
440 if (line != null && line.startsWith("Zookeeper version:")) {
441 return true;
442 }
443 } finally {
444 sock.close();
445 if (reader != null) {
446 reader.close();
447 }
448 }
449 } catch (IOException e) {
450
451 LOG.info("server localhost:" + port + " not up " + e);
452 }
453
454 if (System.currentTimeMillis() > start + timeout) {
455 break;
456 }
457 try {
458 Thread.sleep(250);
459 } catch (InterruptedException e) {
460 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
461 }
462 }
463 return false;
464 }
465
466 public int getClientPort() {
467 return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
468 : clientPortList.get(activeZKServerIndex);
469 }
470 }